SQSとLambdaで実装する直列処理
CX事業本部の夏目です。
今回は、SQSとLambdaを使って直列処理を実装する方法について紹介します。
方法
- Lambda Functionの予約された同時実行数を
1
に制限する - Lambda FunctionのトリガーとしてSQSを指定する
これだけで簡単にLambdaを直列に動かすことができます。
通常のSQSのキューではキューに入った順序で実行されるとは限りませんが、FIFOキューを使えば先入れ先出しの順序で処理することができます。
また、FIFOキューを使っていてLambda Functionがエラー終了した場合でも、先入れ先出し順序は保たれます。
まとめ
S3のファイルを扱うときなど、直列的に処理を行う必要がある場合に使うことができると思います。
今回検証した内容では、同時実行数1
でもLambdaでスロットルが発生しない規模のものを想定しています。
実際には1件処理するのにかかる時間やSQSが一度に受け取ることができるメッセージの上限などの制約が存在するため、一度に大量のメッセージが飛んできた場合など、うまく処理が走らない場合があるのでご注意ください。
説明 / 検証
なんでLambda Functionの予約された同時実行数を 1
に制限するの?
SQSトリガーのLambdaはSQSに入っているデータを一気に処理できるよう、最大限にスケールして動きます。
例えば、バッチサイズが3
でキューに50個
データが有る場合、Lambdaが 17個並列で起動します。
予約された同時実行数を1
にしないと、Lambdaが複数起動されてしまい直列に処理ができなくなります。
FIFOキューをトリガーに使う際、Lambdaでエラーが起きても順序は保証されるの?
これは実際にLambdaを動かして確認してみました。
FIFOキューの準備
SQSのFIFOキューを作成します。
lambda_serial_test.fifo
という名前を指定して、ここではキューのクイック作成
をクリックして作成します。
作成したキューに対して、キュー操作
のメッセージを送信
から事前に複数のメッセージをキューに入れておきます。
5件のメッセージを準備しました。
Lambda関数の準備
次にLambdaを作っていきます。
FIFOキューと同様に、sqs_serial_test
という名前でPython3.7
を使用するLambdaを作成します。
SQSトリガーのLambdaになるので、ポリシーテンプレートからSQSのポーリングアクセス権限
を付与しておきます。
作成したら、Lambdaのコードを書き換え、保存します。
from random import randrange import json RANGE_MAX: int = 100 THRESHOLD: int = 50 TAG = "AA635220-6123-42A9-9E8F-7142A8D126E4" def puts(data: str, flag: bool): message = f"{TAG} ({'success' if flag else ' failed'}): {data}" print(message) def lambda_handler(event, context): text = event["Records"][0]["body"] flag = randrange(RANGE_MAX) > THRESHOLD puts(text, flag) if not flag: raise Exception()
ざっくり何をしているかと言うと、 - 50%の確率で例外を投げる (エラー終了する) - 正常に終了するのか、エラー終了するのか、またその際のキューの中身は何か、をログに出力する
次に、事前に作成しデータも入れておいたFIFOキューをトリガーに設定します。
バッチサイズを1
にして、1件ずつ処理するようにします。
トリガーの有効化にチェックが入った状態で追加して、すぐに動かします。
しばらく待ってから、CloudWatch LogsにあるLambdaのログを見に行きます。
/aws/lambda/sqs_serial_test
のロググループを開き、ログストリームを開きます。
検索窓(イベントをフィルター
とプレースホルダーが見えるテキストボックス)に "AA635220-6123-42A9-9E8F-7142A8D126E4"
と入力し、エンターを押して検索します。
見ると、4th
というデータのときにLambdaがエラー終了していることがわかります。
しかし、その次が5th
ではなく、もう一度4th
というデータを取り出しています。
このことから、Lambdaがエラー終了してもFIFOキューの順番は保証されていることがわかります。